<?php

namespace app\api\model;



use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\Model;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use think\Db;

/**
 * Created by PhpStorm.
 * User: pandeng
 * Date: 2017-07-26
 * Time: 21:51
 */
class ccPush extends model

{
    const HOST = '192.168.0.18';
    const PORT = '5672';
    const LOGIN = 'admin';
    const PASSWORD = 'Yiniao3137';
    const VHOST = '/psi';

      //交换机名称
    public $exchangeName = 'delay_topic';
    
    
    
    //延迟时长
    public $delaySecond =0;
    
    public $channel;
    
    public function __construct($time,$delayQueueName,$exchangeName,$delayRouteKey)
    {
        $this->delaySecond=$time;
        
        //获取配置
        
        //创建连接
        $this->connection =new AMQPStreamConnection(self::HOST, self::PORT, self::LOGIN, self::PASSWORD, self::VHOST);
        $this->channel = $this->connection->channel();
        $this->channel->exchange_declare(
            $this->exchangeName,
            //exchange类型为x-delayed-message
            'x-delayed-message',
            false,
            true,
            false,
            false,
            false,
            //此处是重点，$argument必须使用new AMQPTable()生成
            new AMQPTable([
                "x-delayed-type" => 'direct'
            ])
        );
        //队列声明
        $this->channel->queue_declare($delayQueueName, false, true, false, false);
        //队列与exchange绑定
        $this->channel->queue_bind($delayQueueName, $exchangeName, $delayRouteKey);
    }
    
    
    
    //生成消息
    private function createDelayMsg($data)
    {
        $this->msg = new AMQPMessage(
            $data,
            [
                'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT,
                //此处是重点，设置延时时间，单位是毫秒 1s=1000ms,实例延迟20s
                'application_headers' => new AMQPTable([
                    'x-delay' => $this->delaySecond*1000,
                ])
            ]
        );
        
        return $this->msg;
    }
    //生产者发送消息，代码中只需调用此方法发送消息即可
    public function push($msg_data,$delayQueueName,$exchangeName,$delayRouteKey)
    { //推送成功
        
        $this->channel->set_ack_handler(
            function (AMQPMessage $message) {
                Db::connect(config('database.zong'))->table('log')->insertGetId(['admin_id' => 0, 'created_at' => time(), 'content' => $message -> body,'type' => 1, 'msg'=>'发送成功']);
                
            }
        );
        //推送失败
        $this->channel->set_nack_handler(
            function (AMQPMessage $message) {
                Db::connect(config('database.zong'))->table('log')->insertGetId(['admin_id' => 0, 'created_at' => time(), 'content' => $message -> body,'type' => 1, 'msg'=>'发送失败']);
            }
        );
        $msg = $this->createDelayMsg($msg_data);
        $this->channel->confirm_select();
        $this->channel->basic_publish($msg, $exchangeName, $delayRouteKey);
        $this->channel->wait_for_pending_acks();
        $this->channel->close();
        $this->connection->close();
        
    }


}


